{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "### How to transform large text files into Parquet files\n",
    "\n",
    "This file needs to be run on AWS, and we need to make sure that the dbutils code works.\n",
    "\n",
    "The weather data, as text, is about 7GB, after some organization, stored in 94 files, each of size about 85MB.\n",
    "\n",
    "Initially, I used the following code to read in the data. Even though I used a reasonably large cluster, with about 100GB of memory, 5 worker nodes and 20 cores.\n",
    "\n",
    "It took about 106 minutes (one hour and 46 minutes) to complete this job! Why?"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "source": [
    "For this first exercise, we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million **nework interactions**. First, download and read the gzip file:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "#start the SparkContext\n",
    "#import findspark\n",
    "#findspark.init()\n",
    "\n",
    "from pyspark import SparkContext\n",
    "sc = SparkContext(master=\"local[3]\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "494021\n",
      "['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.']\n"
     ]
    }
   ],
   "source": [
    "import urllib.request, urllib.parse, urllib.error\n",
    "f = urllib.request.urlretrieve (\"http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz\", \"kddcup.data_10_percent.gz\")\n",
    "data_file = \"./kddcup.data_10_percent.gz\"\n",
    "raw_data = sc.textFile(data_file)\n",
    "print(raw_data.count())\n",
    "print(raw_data.take(1))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "```python\n",
    "from pyspark.sql import Row\n",
    "import numpy as np\n",
    "# 1. Split each line using comma,\n",
    "# 2. remove first line (which is the names of each column)\n",
    "# 3. sort the data based on the \"year\" attribute\n",
    "\n",
    "path = '/mnt/NCDC-weather/WeatherUncompressed/'\n",
    "file_list=dbutils.fs.ls(path)\n",
    "dataRows=range(len(file_list))\n",
    "dataFrames=range(len(file_list))\n",
    "\n",
    "for i in range(len(file_list)):\n",
    "    filename=file_list[i].path\n",
    "    data = sc.textFile(filename)\n",
    "    dataRows[i] = data.map(lambda s: s.split(',')) \\\n",
    "               .filter(lambda d: d[0] != 'station') \\\n",
    "               .filter(lambda d:len(d)==368)\\\n",
    "               .map(lambda d: tuple(d[0:2]) + tuple([convert(x) for x in d[2:]])) \\\n",
    "               .sortBy(lambda d: d[2])\n",
    "    dataFrames[i] = sqlContext.createDataFrame(dataRows[i], index)\n",
    "    if i==0:\n",
    "      combinedDataFrame=dataFrames[i]\n",
    "    else:\n",
    "      combinedDataFrame=combinedDataFrame.unionAll(dataFrames[i])\n",
    "    print filename\n",
    "    print combinedDataFrame.count()\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "The reason that this code is so slow is that the for loop which iterates over the files, while quite sensible for a single computer, is a very bad idea when using a cluster with 20 cores. It forces the cluster to read one file at a time, which means that at each point of time only one file is being read.\n",
    "\n",
    "A simpler code, shown below, finished the same task in 8 minutes!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "```python\n",
    "from pyspark.sql import Row\n",
    "import numpy as np\n",
    "# 1. Split each line using comma,\n",
    "# 2. remove first line (which is the names of each column)\n",
    "# 3. sort the data based on the \"year\" attribute\n",
    "\n",
    "dataRows=range(len(file_list))\n",
    "dataFrames=range(len(file_list))\n",
    "\n",
    "path = '/mnt/NCDC-weather/WeatherUncompressed/'\n",
    "data = sc.textFile(path)\n",
    "dataRows = data.map(lambda s: s.split(',')) \\\n",
    "               .filter(lambda d: d[0] != 'station') \\\n",
    "               .filter(lambda d:len(d)==368)\\\n",
    "               .map(lambda d: tuple(d[0:2]) + tuple([convert(x) for x in d[2:]])) \\\n",
    "               .sortBy(lambda d: d[2])\n",
    "dataFrame = sqlContext.createDataFrame(dataRows, index)\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "The resulting Parquet files are 4GB (almost half the original 7GB).\n",
    "\n",
    "```python\n",
    "dataFrame.write.parquet(\"/mnt/NCDC-weather/Weather.parquet\")\n",
    "\n",
    "# the smaller file that we use in class was generated as follows:\n",
    "\n",
    "sampled_df=dataFrame.sample(False,0.001)\n",
    "sampled_df.write.parquet(\"/mnt/NCDC-weather/Weather_sampled.parquet\")\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "sc.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "celltoolbar": "Slideshow",
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": true,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {},
   "toc_section_display": true,
   "toc_window_display": false
  },
  "varInspector": {
   "cols": {
    "lenName": 16,
    "lenType": 16,
    "lenVar": 40
   },
   "kernels_config": {
    "python": {
     "delete_cmd_postfix": "",
     "delete_cmd_prefix": "del ",
     "library": "var_list.py",
     "varRefreshCmd": "print(var_dic_list())"
    },
    "r": {
     "delete_cmd_postfix": ") ",
     "delete_cmd_prefix": "rm(",
     "library": "var_list.r",
     "varRefreshCmd": "cat(var_dic_list()) "
    }
   },
   "types_to_exclude": [
    "module",
    "function",
    "builtin_function_or_method",
    "instance",
    "_Feature"
   ],
   "window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
